14f4d6f2b172d4a6fc0f5fd27f90cd3f9cd7d69f,test-framework/PurchaseAnalytics/src/main/java/com/continuuity/testsuite/purchaseanalytics/appdriver/PurchaseAnalyticsAppDriver.java,PurchaseAnalyticsAppDriver,main,#String[]#,51
Before Change
// Run map reduce jobs.
appDriver.startComponent("PurchaseHistoryBuilder", "mapreduce");
appDriver.startComponent("PurchaseStatsBuilder", "mapreduce");
appDriver.startComponent("RegionBuilder", "mapreduce");
// TODO: Wait for status on MR jobs, correlate result from MR job
After Change
appDriver.startComponent("PurchaseAnalyticsFlow", "flow");
// check that flow started properly
if (appDriver.checkComponentStatus("PurchaseAnalyticsFlow", "flow") != "RUNNING") {
LOG.error(String.format("Unable to start PurchaseAnalytics flow."));
return;
}
// Randomly generates n number of transactions.
appDriver.generateTransactions();
Customer returnedCustomer = null;
// Verify that all customers streamed have been inserted
for (Customer customer : appDriver.helper.getCustomers()) {
if (!appDriver.customerExists(customer)) {
LOG.error("Customer not found: " + gson.toJson(customer));
}
}
for (Product product : appDriver.helper.getProducts()) {
if (!appDriver.productExists(product)) {
LOG.error("Customer not found: " + gson.toJson(product));
}
}
try {
// Run map reduce jobs.
appDriver.startComponent("PurchaseHistoryBuilder", "mapreduce");
// pool for for completion
while (appDriver.checkComponentStatus("PurchaseHistoryBuilder", "mapreduce") == "RUNNING") {
Thread.sleep(500);
}
appDriver.startComponent("PurchaseStatsBuilder", "mapreduce");
// pool for for completion
while (appDriver.checkComponentStatus("PurchaseStatsBuilder", "mapreduce") == "RUNNING") {
Thread.sleep(500);
}
appDriver.startComponent("RegionBuilder", "mapreduce");
// pool for for completion
while (appDriver.checkComponentStatus("PurchaseStatsBuilder", "mapreduce") == "RUNNING") {
Thread.sleep(500);
}
} catch (InterruptedException ex) {
LOG.error(ex.getLocalizedMessage());
}